package org.example.realtime.jtp.dwd.log.job;

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
/**
 * @Author Lianzy
 * @Package org.example.realtime.jtp.dwd.log.job
 * @Date 2025/5/22 20:09
 */
public class Test1 {

    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 文件路径（根据实际情况修改）
        String filePath = "D:\\workspace\\ShiXun2\\realtime-project-05lianzhenyu\\topic-log.txt";

        // 使用TextInputFormat读取文本文件
        TextInputFormat inputFormat = new TextInputFormat(new Path(filePath));

        // 持续监控文件变化，每隔1秒检查一次
        DataStream<String> textStream = env.readFile(
                inputFormat
                ,filePath
                ,FileProcessingMode.PROCESS_CONTINUOUSLY
                , 1000
        );
        textStream.print();
        // 执行任务
        env.execute("Log File Stream Processing");
    }
}
